WebSocketClient.on   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 11
rs 9.85
c 0
b 0
f 0
cc 2
1
import './types'
2
import { Zipnum } from 'zipnum'
3
import { add_event, rm_event, sett } from './utils'
4
import { processConfig } from './config'
5
import { AnyFunc, F, once, qfilter, T } from 'pepka'
6
7
const MAX_32 = 2**31 - 1
8
const zipnum = new Zipnum()
9
10
type EventHandler<T extends keyof WebSocketEventMap> = AnyFunc<any, [WebSocketEventMap[T]]>
11
type EventHandlers = {
12
  open: EventHandler<'open'>[]
13
  close: EventHandler<'close'>[]
14
  error: EventHandler<'error'>[]
15
  message: AnyFunc<any, [WebSocketEventMap['message'] & {data: any}]>[]
16
  timeout: AnyFunc<any, [data: any]>[]
17
}
18
19
class WebSocketClient {
20
  private open = false
21
  private ws: wsc.Socket|null = null
22
  private forcibly_closed = false
23
  private reconnect_timeout: NodeJS.Timeout|null = null
24
  private queue: Record<string, wsc.Message> = {}
25
  private onReadyQueue: AnyFunc[] = []
26
  private onCloseQueue: AnyFunc[] = []
27
  private handlers: EventHandlers = { open: [], close: [], message: [], error: [], timeout: [] }
28
  private config = <wsc.Config>{}
29
30
  private init_flush(): void {
31
    // TODO: reject them or save somehow ?..
32
    qfilter(F, this.queue)
33
  }
34
  private call(event_name: wsc.WSEvent, ...args: any[]) {
35
    // this.handlers.open[0]()
36
    for(const h of this.handlers[event_name]) h(...args)
37
  }
38
39
  private log(event: string, message: any = null, time: number|null = null): void {
40
    const config = this.config
41
    if(time !== null) {
42
      config.log(event, time, message)
43
    } else {
44
      if(config.timer) {
45
        config.log(event, null, message)
46
      } else {
47
        config.log(event, message)
48
      }
49
    }
50
  }
51
52
  private initSocket(ws: wsc.Socket) {
53
    const {queue, config} = this
54
    this.open = true
55
    this.ws = ws
56
    this.onReadyQueue.forEach((fn: Function) => fn())
57
    this.onReadyQueue.splice(0)
58
    const {id_key, data_key} = config.server
59
    // Works also on previously opened sockets that do not fire 'open' event.
60
    this.call('open', ws)
61
    for(const msg_id in queue) ws.send(queue[msg_id].msg)
62
    if(this.reconnect_timeout !== null) {
63
      clearInterval(this.reconnect_timeout)
64
      this.reconnect_timeout = null
65
    }
66
    if(config.ping) {
67
      const ping_interval = setInterval(() => {
68
        if(this.open) this.send(config.ping.content)
69
        if(this.forcibly_closed) clearInterval(ping_interval)
70
      }, config.ping.interval*1e3)
71
    }
72
    add_event(ws, 'close', async (...e) => {
73
      this.log('close')
74
      this.open = false
75
      this.ws = null
76
      this.onCloseQueue.forEach((fn: Function) => fn())
77
      this.onCloseQueue.splice(0)
78
      this.call('close', ...e)
79
      // Auto reconnect.
80
      const reconnect = config.reconnect
81
      if(
82
        typeof reconnect === 'number' &&
83
        !isNaN(reconnect) &&
84
        !this.forcibly_closed
85
      ) {
86
        const reconnectFunc = async () => {
87
          this.log('reconnect')
88
          if(this.ws !== null) {
89
            this.ws.close()
90
            this.ws = null
91
          }
92
          // If some error occured, try again.
93
          const status = await this.connect()
94
          if(status !== null)
95
            this.reconnect_timeout = setTimeout(reconnectFunc, reconnect * 1000)
96
        }
97
        // TODO: test normal close by server. Would it be infinite ?
98
        reconnectFunc()
99
      }
100
      // reset the flag to reuse.
101
      this.forcibly_closed = false
102
    })
103
    add_event(ws, 'message', (e) => {
104
      try {
105
        const data = config.decode(e.data)
106
        this.call('message', {...e, data})
107
        if(data[id_key]) {
108
          const q = this.queue[data[id_key]]
109
          if(q) {
110
            // Debug, Log.
111
            const time = q.sent_time ? (Date.now() - q.sent_time) : null
112
            this.log('message', data[data_key], time)
113
            // Play.
114
            q.ff(data[data_key])
115
          }
116
        }
117
      } catch (err) {
118
        console.error(err, `WSP: Decode error. Got: ${e.data}`)
119
      }
120
    })
121
  }
122
123
  private connect() { // returns status if won't open or null if ok.
124
    return new Promise((ff) => {
125
      if(this.open === true) {
126
        return ff(null)
127
      }
128
      const config = this.config
129
      const ws = config.socket || config.adapter(config.url, config.protocols)
130
      if(!ws || ws.readyState > 1) {
131
        this.ws = null
132
        this.log('error', 'ready() on closing or closed state! status 2.')
133
        return ff(2)
134
      }
135
      const ffo = once(ff)
136
      add_event(ws, 'error', once((e) => {
137
        this.log('error', 'status 3. Err: '+e.message)
138
        this.call('error', e)
139
        this.ws = null
140
        // Some network error: Connection refused or so.
141
        ffo(3)
142
      }))
143
      // Because 'open' won't be envoked on opened socket.
144
      if(ws.readyState) {
145
        this.initSocket(ws)
146
        ffo(null)
147
      } else {
148
        add_event(ws, 'open', once(() => {
149
          this.log('open')
150
          this.initSocket(ws)
151
          ffo(null)
152
        }))
153
      }
154
    })
155
  }
156
  public get socket() { return this.ws }
157
  public async ready() {
158
    return new Promise<void>((ff) => {
159
      if(this.open) ff()
160
      else this.onReadyQueue.push(ff)
161
    })
162
  }
163
  public on(
164
    event_name: wsc.WSEvent,
165
    handler: (data: any) => any,
166
    predicate: (data: any) => boolean = T,
167
    raw = false
168
  ) {
169
    const _handler: wsc.EventHandler = (event) =>
170
      predicate(event) && handler(event)
171
    if(raw) add_event(this.ws as wsc.Socket, event_name, _handler)
172
    else this.handlers[event_name].push(_handler)
173
    return _handler
174
  }
175
  public off(
176
    event_name: wsc.WSEvent,
177
    handler: (data: any) => any,
178
    raw = false
179
  ) {
180
    if(raw) return rm_event(this.ws as wsc.Socket, event_name, handler)
181
    const handlers = this.handlers[event_name]
182
    const i = handlers.indexOf(handler)
183
    if(~i) handlers.splice(i, 1)
184
  }
185
186
  public async close(): wsc.AsyncErrCode {
187
    return new Promise((ff, rj) => {
188
      if(this.ws === null) {
189
        rj('WSP: closing a non-inited socket!')
190
      } else {
191
        this.open = false
192
        this.onCloseQueue.push(() => {
193
          this.init_flush()
194
          this.ws = null
195
          this.forcibly_closed = true
196
          ff(null)
197
        })
198
        this.ws.close()
199
      }
200
    })
201
  }
202
203
  /**  .send(your_data) wraps request to server with {id: `hash`, data: `actually your data`},
204
    returns a Promise that will be rejected after a timeout or
205
    resolved if server returns the same signature: {id: `same_hash`, data: `response data`}.
206
  */
207
  public async send<RequestDataType = any, ResponseDataType = any>(
208
    message_data: RequestDataType,
209
    opts = <wsc.SendOptions>{}
210
  ): Promise<ResponseDataType> {
211
    this.log('send', message_data)
212
    const {config, ws, forcibly_closed, queue} = this
213
    const message  = {}
214
    const data_key = config.server.data_key
215
216
    const message_id = zipnum.zip((Math.random()*(MAX_32-10))|0)
217
    if(typeof opts.top === 'object') {
218
      if(opts.top[data_key]) {
219
        throw new Error('Attempting to set data key/token via send() options!')
220
      }
221
      Object.assign(message, opts.top)
222
    }
223
    config.pipes.forEach((pipe) => message_data = pipe(message_data))
224
225
    if(forcibly_closed)
226
      throw new Error('Attempting to send via closed WebSocket connection!')
227
    if(!this.open) this.connect()
228
    const msg = await config.encode(message_id, message_data, config)
229
    if(ws?.readyState===1) // The only opened state.
230
      ws.send(msg)
231
232
    return new Promise((ff, rj) => {
233
      this.queue[message_id] = {
234
        msg, ff(x: any) {
235
          ff(x)
236
          // cleanup.
237
          clearTimeout(this.timeout) // from this object!
238
          delete queue[message_id]
239
        },
240
        data_type: config.data_type,
241
        sent_time: config.timer ? Date.now() : null,
242
        timeout: sett(config.timeout, () => {
243
          if(this.queue[message_id]) {
244
            this.call('timeout', message_data)
245
            rj({
246
              'Websocket timeout expired: ': config.timeout,
247
              'for the message ': message_data
248
            })
249
            delete queue[message_id]
250
          }
251
        })
252
      }
253
    })
254
  }
255
256
  // TODO: Add .on handlers to config!
257
  constructor(user_config: wsc.UserConfig = {}) {
258
    this.config = processConfig(user_config)
259
    this.init_flush()
260
    if(!this.config.lazy) this.connect()
261
  }
262
}
263
264
/* TODO: v3: @.deprecated. Use named import { WebSocketClient } instead. */
265
export default WebSocketClient